import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class Distinctwithkeyexpression
{
    // some ordinary POJO
    static public class CustomType {
        public String aName;
        public int aNumber;

        private CustomType(String a,Integer b)
        {

            this.aName=a;
            this.aNumber=b;
        }
    }



    public static void main(String[] args) throws Exception
    {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String rootPath = new StringBuilder(System.getProperty("user.dir")).append("/dataset_api/Java").toString();



        DataSet<CustomType> input = env.readCsvFile("file:///" + rootPath + "/" + "cog1.csv").types(String.class,Integer.class).map(new MapFunction<Tuple2<String, Integer>, CustomType>()
                {
            @Override
            public CustomType map(Tuple2<String, Integer> tuple2) throws Exception
            {
                return  new CustomType(tuple2.f0,tuple2.f1);
            }
        });

        System.out.println(input.collect().get(0).aName);
        System.out.println(input.collect().get(0).aNumber);



//下面两行是官方文档错误，对象是无法去重的。
//     DataSet<CustomType> output = input.distinct("aName", "aNumber");
//     output.print();

}


}